package main

import (
	"context"
	"fmt"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/segmentio/kafka-go"
	"slices"
)

func main() {
	// Kafka broker地址
	brokers := []string{"localhost:9092"}

	// 主题和分区信息
	topic := "my-topic"
	partitions := []int{2, 3} // 要消费的分区列表

	// 消费者组ID
	groupID := "my-group"

	// 创建一个reader，指定GroupID，从 topic-A 消费消息
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  groupID, // 指定消费者组id
		Topic:    topic,
		MaxBytes: 10e6, // 10MB
	})

	// 接收消息
	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			// Notice log...
			fmt.Println("errReadMessage: ", err)
			break
		}

		// Notice 消费指定分区数据...
		if !slices.Contains(partitions, m.Partition) {
			fmt.Println("不是要消费的分区!......", m.Partition, m.Offset)
			continue
		}

		// TODO 做一些业务逻辑的处理！！！
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}

	// 程序退出前关闭Reader
	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}

}
